Python の workflow Package でバッチ Job を管理してみた
こんにちは、みかみです。
Python でバッチ作ってます。Python で Workflow 管理といえば、Airflow とか、Luigi とか?
だけどフレームワークって、便利な反面カスタマイズしにくかったり回りくどくなったりで、バッチ処理くらいだったら全部自分でコーディングできちゃうし。
でも自分で書いてると、だんだん見返すのが億劫なコードになってきて。。。
はじめに
やりたいこと
- バッチ Job の Workflow 管理部を簡単に実装したい
- 普通にコード書いて実装してたら if 文とかですごく見にくくなってきた warkflow 制御部を、誰でもメンテできる可読性の高いコードにしたい
- Python と親和性の高い Job 管理部にしたい(Job を Python で書いてるので
- お手軽に実装したい(GUI やスケジューラー、監視(通知)機能はなくてもいい
動作環境
- OS:Windows10(Mac VMware Fusion)
- Python 3.6.0
やってみた
workflow(2.1.2) を インストール
pip で workflow をインストールしました。
インストール時、2件ほどエラー出ましたが。。
AttributeError: module 'enum' has no attribute 'IntFlag'
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x83 in position 32: invalid start byte
- pipでUnicodeDecodeErrorが発生(Windows環境) | Qiita
- Add one more fail-safe when decoding console to str in Py3 #4310 | GitHub
\Python36\Lib\site-packages\enum
のフォルダ名変更して、pull request の変更反映して、install できました。。
C:\Users\mikami.yuki>pip install workflow Collecting workflow Using cached workflow-2.1.2-py2.py3-none-any.whl (省略) Installing collected packages: dulwich, autosemver, workflow Successfully installed autosemver-0.5.2 dulwich-0.18.2 workflow-2.1.2
先行 Job の実行結果によって次に実行する Job を変える
実行する Job 本体を4つ準備しました。
- Job check:とりあえず、チェック結果(True or False)はコマンドラインのパラメータでもらい、そのまま返します。
- Job A:もらったパラメータを出力して、Job A 実行状態を更新します。
- Job B:もらったパラメータを出力して、Job B 実行状態を更新します。
- Job comp:もらったパラメータを出力します。
def check(obj, eng): print('execute check') return obj.data def job_a(obj, eng): print('execute Job A') print(' Job A {}'.format(obj.job_a)) print(' Job B {}'.format(obj.job_b)) obj.job_a = 'done' return obj def job_b(obj, eng): print('execute Job B') print(' Job A : {}'.format(obj.job_a)) print(' Job B : {}'.format(obj.job_b)) obj.job_b = 'done' return obj def comp(obj, eng): print('execute comp') print(' input : {}'.format(obj.data)) print(' Job A : {}'.format(obj.job_a)) print(' Job B : {}'.format(obj.job_b))
実行するJobは、配列で定義して workflow パッケージに渡します。
制御条件も指定できます。
Job check の結果が True ならば Job Aと B を、False ならば Job A だけを実行するように定義しました。
from workflow.patterns.controlflow import IF_ELSE from job import check, comp, job_a, job_b my_workflow = [ IF_ELSE( check, [ job_a, job_b ], [ job_a ] ), comp ]
次は workflow のインスタンスを作成して実行するメインタスクです。
Job の実行パラメータを Wrapper クラスを介して Job に渡します。
パラメータには Job A と Job B の実行状態も持たせました。
from workflow.engine import GenericWorkflowEngine from my_flow import my_workflow import sys from distutils.util import strtobool class ParamWrapper(object): def __init__(self, data): self.data = data self.job_a = 'yet' self.job_b = 'yet' my_engine = GenericWorkflowEngine() my_engine.callbacks.replace(my_workflow) my_object = ParamWrapper(strtobool(sys.argv[1])) my_engine.process([my_object])
コードの準備はできました。
まずはコマンドラインから True を指定して実行してみます。
Job check の実行結果が True になるはずなので、Job A と Job B が実行されるはずです。
C:\Users\mikami.yuki\work\workflow\src>py task_main.py True execute check execute Job A Job A yet Job B yet execute Job B Job A : done Job B : yet execute comp input : 1 Job A : done Job B : done
Job A、 Job B ともに実行されました。
次はパラメータで False を指定してみます。
Job check の結果が False になるので、Job A しか実行されないはずです。
C:\Users\mikami.yuki\work\workflow\src>py task_main.py False execute check execute Job A Job A yet Job B yet execute comp input : 0 Job A : done Job B : yet
期待通り、Job A だけが実行されました。
Job が失敗したらリトライする
例えばDBコネクトエラーなどでリトライしたいケースを想定。
準備した Job は A, B, C, D の4つ。Job B に処理中断する仕込みを入れました。
コマンドラインでパラメータに True を指定した場合は中断します。
def job_a(obj, eng): print('execute Job A') def job_b(obj, eng): print('execute Job B') if obj.data: print('Raising HaltProcessing') eng.halt('interrupting this workflow.') def job_c(obj, eng): print('execute Job C') def job_d(obj, eng): print('execute Job D')
ワークフロー定義では特に制御条件を入れていません。
from job import job_a, job_b, job_c, job_d my_workflow = [ job_a, job_b, job_c, job_d ]
メインタスクで処理中断を catch して、リトライします。
from workflow.errors import HaltProcessing from workflow.engine import GenericWorkflowEngine from my_flow import my_workflow import sys from distutils.util import strtobool class ParamWrapper(object): def __init__(self, data): self.data = data my_engine = GenericWorkflowEngine() my_engine.callbacks.replace(my_workflow) my_object = ParamWrapper(strtobool(sys.argv[1])) try: my_engine.process([my_object]) except HaltProcessing: for i in range(3): try: my_engine.restart('current', 'current') except HaltProcessing: continue else: break
処理中断の仕込みに引っかからないパラメータ、False を指定した場合は
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py False execute Job A execute Job B execute Job C execute Job D
Job はワークフロー定義で指定した通り順番に実行されます。
が、コマンドラインからパラメータに True を指定して、処理を中断させてみると、
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True execute Job A execute Job B Raising HaltProcessing execute Job B Raising HaltProcessing execute Job B Raising HaltProcessing execute Job B Raising HaltProcessing
仕込みを入れた Job B で処理中断した後、メインタスクで指定した3回リトライしてワークフロー終了しました。
リトライで処理が正常復帰したケースを想定して、メインタスクにも仕込みを入れてみます。
(省略) try: my_engine.process([my_object]) except HaltProcessing: for i in range(3): try: my_object.data = False my_engine.restart('current', 'current') except HaltProcessing: continue else: break
実行すると
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True execute Job A execute Job B Raising HaltProcessing execute Job B execute Job C execute Job D
ちゃんと、中断した Job から先のワークフローを実行してくれました。
他にも、正常復帰したら最初の Job からやり直すことや
(省略) try: my_engine.process([my_object]) except HaltProcessing: for i in range(3): try: my_object.data = False my_engine.restart('current', 'first') except HaltProcessing: continue else: break
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True execute Job A execute Job B Raising HaltProcessing execute Job A execute Job B execute Job C execute Job D
正常復帰しない場合に次の Job から継続することも、簡単に実装できました。
(省略) try: my_engine.process([my_object]) except HaltProcessing: for i in range(3): try: my_engine.restart('current', 'current') except HaltProcessing: continue else: break my_engine.restart('current', 'next')
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True execute Job A execute Job B Raising HaltProcessing execute Job B Raising HaltProcessing execute Job B Raising HaltProcessing execute Job B Raising HaltProcessing execute Job C execute Job D
おわりに(まとめ・所感)
- お手軽に実装できました。
- Wrapper 作ったりすれば好きにカスタマイズできそう。
- ドキュメントは少ない?(google 先生に聞いてもあんまりいい答え返ってこない。。
- あるものは使うに限る!(自分でちまちまコーディングするより、やっぱりパッケージ使った方がらくちん